package base_spark

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import java.util.concurrent.ConcurrentLinkedDeque

object ExStream {
    val conf: SparkConf = new SparkConf()
    conf.setMaster("local[2]")
        .setAppName("计算平均值")


    def wordCountStream(path: String): Unit = {
        StreamLogger.setStreamingLogLevels()
        val ssc = new StreamingContext(conf, Seconds(20))

        ssc.textFileStream(path)
            .flatMap(_.split(","))
            .map((_, 1))
            .reduceByKey((_ + _))
            .print()
        ssc.start()
        ssc.awaitTermination()
    }

    def wordCount(path: String): Unit = {
        val sc = new SparkContext(conf)
        sc.textFile(path)
            .flatMap(_.split(","))
            .map((_, 1))
            .reduceByKey((_ + _))
            .collect()
            .foreach(println)
    }

    def rddStream(): Unit = {
        StreamLogger.setStreamingLogLevels()
        val sparkConf = new SparkConf().setAppName("TestRDDQueue").setMaster("local[*]")
        val ssc = new StreamingContext(sparkConf, Seconds(2))
        val rddQueue = new scala.collection.mutable.SynchronizedQueue[RDD[Int]]()
        val queueStream = ssc.queueStream(rddQueue)
        val mappedStream = queueStream.map(r => (r % 10, 1))
        val reducedStream = mappedStream.reduceByKey(_ + _)
        reducedStream.print()
        ssc.start()
        for (i <- 1 to 100) {
            rddQueue += ssc.sparkContext.makeRDD(1 to 100, 2)
            Thread.sleep(1000)
        }
        ssc.stop()


    }


    def main(args: Array[String]): Unit = {
        //        wordCountStream(base_spark.util.Utils.getResourcePath("/csv"))
        //        wordCount(base_spark.util.Utils.getResourcePath("/csv"))
        rddStream()

    }


}
